Toni Verbeiren
DiFlow (Toni)
Break
DNAnexus (Maté)
Single Cell Pipeline and NextFlow pipeline platforms (Dries)
Introduction
DiFlow
A note (addendum training last week)
Step by step
Putting it all together
Extra
Makefile-based approachIn computer programming, dataflow programming is a programming paradigm that models a program as a directed graph of the data flowing between operations…
I invite you to take a look at
https://github.com/nf-core/rnaseq/blob/master/rnaseq.nf
And this already using DSL2 !
We want the flow to be expressed explicitly and concisely:
input_ \
| ... \
| ( parse_header & parse_map ) \
| join \
| ... \
| plot_map \
| convert_plot \
| rename \
| toSortedList{ a,b -> a[0] <=> b[0] } \
| ... \
| combine_plots
Think about apply in R, map in Python, Javascript, etc.
map
is a function that transforms one
Set, List, …
into another
Set, List, …
This can also be events in time:
D[SL2] I[mprovement] Flow
is
An abstraction layer for NextFlow DSL2
A set of conventions
Tooling
[ ID, data, config ]
modules/fastqc_4.nfmodules/multiqc.nf used in pipeline 03-first-pipeline/dsl2-RNAseq.nfprocessmap is synchronousIDMap to store parameters// Step - 7
process addTupleWithMap {
input:
tuple val(id), val(input), val(config)
output:
tuple val("${id}"), val(output)
exec:
output = (config.operator == "+")
? input + config.term
: input - config.term
}
workflow step7 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el ->
[
el.toString(),
el,
[ "operator" : "-", "term" : 10 ]
] } \
| addTupleWithMap \
| view{ it }
}Map with a process-key// Step - 8
process addTupleWithProcessHash {
input:
tuple val(id), val(input), val(config)
output:
tuple val("${id}"), val(output)
exec:
def thisConf = config.addTupleWithProcessHash
output = (thisConf.operator == "+")
? input + thisConf.term
: input - thisConf.term
}
workflow step8 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el ->
[
el.toString(),
el,
[ "addTupleWithProcessHash" :
[
"operator" : "-",
"term" : 10
]
]
] } \
| addTupleWithProcessHash \
| view{ it }
}ConfigMap with a shell script// Step - 9
process addTupleWithProcessHashScript {
input:
tuple val(id), val(input), val(config)
output:
tuple val("${id}"), stdout
script:
def thisConf = config.addTupleWithProcessHashScript
def operator = thisConf.operator
def term = thisConf.term
"""
echo \$( expr $input $operator ${thisConf.term} )
"""
}
workflow step9 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el ->
[
el.toString(),
el,
[ "addTupleWithProcessHashScript" :
[
"operator" : "-",
"term" : 10
]
]
] } \
| addTupleWithProcessHashScript \
| view{ it }
}// Step - 10
process process_step10a {
input:
tuple val(id), val(input), val(term)
output:
tuple val("${id}"), val(output), val("${term}")
exec:
output = input.toInteger() + term.toInteger()
}
process process_step10b {
input:
tuple val(id), val(input), val(term)
output:
tuple val("${id}"), val(output), val("${term}")
exec:
output = input.toInteger() - term.toInteger()
}
workflow step10 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el -> [ el.toString(), el, 10 ] } \
| process_step10a \
| process_step10b \
| view{ it }
}// Step - 11
process process_step11 {
input:
tuple val(id), val(input), val(config)
output:
tuple val("${id}"), val(output), val("${config}")
exec:
if (config.operator == "+")
output = input.toInteger() + config.term.toInteger()
else
output = input.toInteger() - config.term.toInteger()
}
workflow step11 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el -> [ el.toString(), el, [ : ] ] } \
| process_step11 \
| map{ id, value, config ->
[
id,
value,
[ "term" : 11, "operator" : "-" ]
] } \
| process_step11 \
| view{ [ it[0], it[1] ] }
}// Step - 11a
include { process_step11 as process_step11a } \
from './examples/modules/step11.nf'
include { process_step11 as process_step11b } \
from './examples/modules/step11.nf'
workflow step11a {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el -> [ el.toString(), el, [ : ] ] } \
| map{ id, value, config ->
[
id,
value,
[ "term" : 5, "operator" : "+" ]
] } \
| process_step11a \
| map{ id, value, config ->
[
id,
value,
[ "term" : 11, "operator" : "-" ]
] } \
| process_step11b \
| view{ [ it[0], it[1] ] }
}// Step - 12
process process_step12 {
input:
tuple val(id), val(input), val(term)
output:
tuple val("${id}"), val(output), val("${term}")
exec:
output = input.sum()
}
workflow step12 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el -> [ el.toString(), el, 10 ] } \
| process_step10a \
| toList \
| map{
[
"sum",
it.collect{ id, value, config -> value },
[ : ]
] } \
| process_step12 \
| view{ [ it[0], it[1] ] }
}// Step - 13
process process_step13 {
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file("output.txt"), val("${config}")
script:
"""
a=`cat $input`
let result="\$a + ${config.term}"
echo "\$result" > output.txt
"""
}
workflow step13 {
Channel.fromPath( params.input ) \
| map{ el ->
[
el.baseName.toString(),
el,
[ "operator" : "-", "term" : 10 ]
]} \
| process_step13 \
| view{ [ it[0], it[1] ] }
}// Step - 14
process process_step14 {
publishDir "output/"
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file("output.txt"), val("${config}")
script:
"""
a=`cat $input`
let result="\$a + ${config.term}"
echo "\$result" > output.txt
"""
}
workflow step14 {
Channel.fromPath( params.input ) \
| map{ el ->
[
el.baseName.toString(),
el,
[ "operator" : "-", "term" : 10 ]
]} \
| process_step14 \
| view{ [ it[0], it[1] ] }
}// Step - 15
process process_step15 {
publishDir "output/${config.id}"
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file("output.txt"), val("${config}")
script:
"""
a=`cat $input`
let result="\$a + ${config.term}"
echo "\$result" > output.txt
"""
}
workflow step15 {
Channel.fromPath( params.input ) \
| map{ el ->
[
el.baseName,
el,
[
"id": el.baseName,
"operator" : "-",
"term" : 10
]
] } \
| process_step15 \
| view{ [ it[0], it[1] ] }
}params?params// Step - 17
process process_step17 {
publishDir "output"
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file(params.output), val("${config}")
script:
"""
a=`cat $input`
let result="\$a + ${config.term}"
echo "\$result" > ${params.output}
"""
}
workflow step17 {
Channel.fromPath( params.input ) \
| map{ el ->
[
el.baseName.toString(),
el,
[
"id": el.baseName,
"operator" : "-",
"term" : 10
]
] } \
| process_step17 \
| view{ [ it[0], it[1] ] }
}// Step - 18
process process_step18 {
publishDir "output"
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file("${config.output}"), val("${config}")
script:
"""
a=`cat $input`
let result="\$a + ${config.term}"
echo "\$result" > ${config.output}
"""
}
workflow step18 {
Channel.fromPath( params.input ) \
| map{ el -> [
el.baseName.toString(),
el,
[
"output" : "output_from_${el.baseName}.txt",
"id": el.baseName,
"operator" : "-",
"term" : 10
]
]} \
| process_step18 \
| view{ [ it[0], it[1] ] }
}// Step - 19
def out_from_in = { it -> it.baseName + "-out.txt" }
process process_step19 {
publishDir "output"
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file("${out}"), val("${config}")
script:
out = out_from_in(input)
"""
a=`cat $input`
let result="\$a + ${config.term}"
echo "\$result" > ${out}
"""
}
workflow step19 {
Channel.fromPath( params.input ) \
| map{ el -> [
el.baseName.toString(),
el,
[
"id": el.baseName,
"operator" : "-",
"term" : 10
]
]} \
| process_step19 \
| view{ [ it[0], it[1] ] }
}// Step - 20a
process process_step20 {
input:
tuple val(id), val(input), val(term)
output:
tuple val("${id}"), val(output), val("${term}")
exec:
output = input[0] / input[1]
}
workflow step20a {
Channel.from( [ 1, 2 ] ) \
| map{ el -> [ el.toString(), el, 10 ] } \
| process_step10a \
| toList \
| map{ [
"sum",
it.collect{ id, value, config -> value },
[ : ]
] } \
| process_step20 \
| view{ [ it[0], it[1] ] }
}// Step - 22
process process_step22 {
publishDir "output"
input:
tuple val(id), file(input), val(config)
output:
tuple val("${id}"), file("${config.output}"), val("${config}")
script:
"""
${config.cli}
"""
}
workflow step22 {
Channel.fromPath( params.input ) \
| map{ el -> [
el.baseName.toString(),
el,
[
"cli": "cat input.txt > output22.txt",
"output": "output22.txt"
]
]} \
| process_step22 \
| view{ [ it[0], it[1] ] }
}
//- - -params {
...
cellranger {
name = "cellranger"
container = "mapping/cellranger:4.0.0-1"
command = "cellranger"
arguments {
mode {
name = "mode"
otype = "--"
description = "The run mode"
value = "count"
...
}
input {
name = "input"
otype = "--"
description = "Path of folder created by mkfastq or bcl2fastq"
required = false
...
}
id {
name = "id"
otype = "--"
description = "Output folder for the count files."
value = "cr"
required = false
...
}
...
}
...
}workflow instead of processprocess cellranger_process {
...
container "${params.dockerPrefix}${container}"
publishDir "${params.output}/processed_data/${id}/", mode: 'copy', overwrite: true
input:
tuple val(id), path(input), val(output), val(container), val(cli)
output:
tuple val("${id}"), path("${output}")
script:
"""
export PATH="${moduleDir}:\$PATH"
$cli
"""
}combine_plotscombine_plots/
main.nf
nextflow.config
[script(s)]
nextflow.config:
params {
combine_plots__input = "/path/to/my/dir"
combine_plots__output = "output.webm"
combine_plots__framerate = "4"
id = ""
dockerPrefix = ""
input = ""
output = ""
combine_plots {
name = "combine_plots"
container = "jrottenberg/ffmpeg:latest"
command = "combine_plots"
# ...
arguments {
input {
name = "input"
otype = "--"
description = "A list of images."
value = "${params.combine_plots__input}"
required = true
type = "file"
# ...
}
output {
name = "output"
otype = "--"
description = "A path to output the movie to."
value = "${params.combine_plots__output}"
required = true
type = "file"
# ...
}
framerate {
name = "framerate"
otype = "--"
description = "Number of frames per second."
value = "${params.combine_plots__framerate}"
required = false
type = "integer"
# ...
}
}
}
}main.nf:
nextflow.preview.dsl=2
import java.nio.file.Paths
// ... checks
def renderCLI(command, arguments) {
// ... based on nextflow.config, render CLI
}
// files is either String, List[String] or HashMap[String,String]
def outFromIn(files) {
// ... derive output from input filename
}
// In: Hashmap key -> DataObjects
// Out: Arrays of DataObjects
def overrideInput(params, str) {
// ... internal consistency
}
def overrideOutput(params, str) {
// ... internal consistency
}
process combine_plots_process {
// ...
container "${params.dockerPrefix}${container}"
publishDir "${params.output}/${id}/", mode: 'copy', overwrite: true
input:
tuple val(id), path(input), val(output), val(container), val(cli)
output:
tuple val("${id}"), path("${output}")
script:
"""
# ...
$cli
"""
}
workflow combine_plots {
take:
id_input_params_
main:
def key = "combine_plots"
// ... workflow logic
emit:
result_
}
workflow {
// ... Testing procedure(s)
}We don’t write these files ourselves, they are generated!
combine_plots/config.vsh.yaml:
functionality:
name: combine_plots
namespace: civ6_save_renderer
description: Combine multiple images into a movie using ffmpeg.
arguments:
- name: "--input"
alternatives: [-i]
type: file
required: true
default: "/path/to/my/dir"
must_exist: true
multiple: true
description: A list of images.
- name: "--output"
alternatives: [-o]
type: file
required: true
default: "output.webm"
direction: output
description: A path to output the movie to.
- name: "--framerate"
alternatives: [-f]
type: integer
default: 4
description: Number of frames per second.
resources:
- type: bash_script
path: script.sh
platforms:
- type: docker
image: jrottenberg/ffmpeg
- type: nextflow
image: jrottenberg/ffmpeg
publish: true
- type: nativecombine_plots/script.sh:
civ6_postgamehttps://github.com/data-intuitive/viash_docs/tree/master/examples/civ6_postgame
Original bash script:
#!/bin/bash
# run beforehand:
# viash ns build -P docker --setup
# viash ns build -P nextflow
BIN=target/docker/civ6_save_renderer
input_dir="data"
output_dir="output"
mkdir -p "$output_dir"
function msg {
echo -e "\033[32m>>>>>>> $1\e[0m"
}
for save_file in $input_dir/*.Civ6Save; do
file_basename=$(basename $save_file)
yaml_file="$output_dir/${file_basename/Civ6Save/yaml}"
tsv_file="$output_dir/${file_basename/Civ6Save/tsv}"
pdf_file="$output_dir/${file_basename/Civ6Save/pdf}"
png_file="$output_dir/${file_basename/Civ6Save/png}"
if [ ! -f "$yaml_file" ]; then
msg "parse header '$save_file'"
$BIN/parse_header/parse_header -i "$save_file" -o "$yaml_file"
fi
if [ ! -f "$tsv_file" ]; then
msg "parse map '$save_file'"
$BIN/parse_map/parse_map -i "$save_file" -o "$tsv_file"
fi
if [ ! -f "$pdf_file" ]; then
msg "plot map '$save_file'"
$BIN/plot_map/plot_map -y "$yaml_file" -t "$tsv_file" -o "$pdf_file"
fi
if [ ! -f "$png_file" ]; then
msg "convert plot '$save_file'"
$BIN/convert_plot/convert_plot -i "$pdf_file" -o "$png_file"
fi
done
png_inputs=`find "$output_dir" -name "*.png" | sed "s#.*#&:#" | tr -d '\n' | sed 's#:$#\n#'`
msg "combine plots"
$BIN/combine_plots/combine_plots -i "$png_inputs" -o "$output_dir/movie.webm" --framerate 1nextflow.config
includeConfig 'target/nextflow/civ6_save_renderer/plot_map/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/combine_plots/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/convert_plot/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/parse_header/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/parse_map/nextflow.config'
docker {
runOptions = "-i -v ${baseDir}:${baseDir}"
}main.nf
nextflow.preview.dsl=2
import java.nio.file.Paths
include plot_map from './target/nextflow/civ6_save_renderer/plot_map/main.nf' params(params)
include combine_plots from './target/nextflow/civ6_save_renderer/combine_plots/main.nf' params(params)
include convert_plot from './target/nextflow/civ6_save_renderer/convert_plot/main.nf' params(params)
include parse_header from './target/nextflow/civ6_save_renderer/parse_header/main.nf' params(params)
include parse_map from './target/nextflow/civ6_save_renderer/parse_map/main.nf' params(params)
include rename from './src/utils.nf'
workflow {
if (params.debug == true)
println(params)
if (!params.containsKey("input") || params.input == "") {
exit 1, "ERROR: Please provide a --input parameter pointing to .Civ6Save file(s)"
}
def input_ = Channel.fromPath(params.input)
def listToTriplet = { it -> [ "all", it.collect{ a -> a[1] }, params ] }
input_ \
| map{ it -> [ it.baseName , it ] } \
| map{ it -> [ it[0] , it[1], params ] } \
| ( parse_header & parse_map ) \
| join \
| map{ id, parse_headerOut, params1, parse_mapOut, params2 ->
[ id, [ "yaml" : parse_headerOut, "tsv": parse_mapOut ], params1 ] } \
| plot_map \
| convert_plot \
| rename \
| toSortedList{ a,b -> a[0] <=> b[0] } \
| map( listToTriplet ) \
| combine_plots
}Running the pipeline
process process_join_process {
input:
tuple val(id1), val(in1)
tuple val(id2), val(in2)
output:
tuple val("${id1}"), val(out)
exec:
out = in1 + "/" + in2
}
workflow join_process {
ch1_ = Channel.from( ["1", "2", "3", "4" ] ) | map{ [ it, it.toString() ] } \
| addTuple
ch2_ = Channel.fromList( [ ["1", "a"] , ["2", "b"], ["3", "c"], ["4", "d"] ] )
process_join_process(ch1_, ch2_) \
| toSortedList | view
}N E X T F L O W ~ version 20.10.0
Launching `./main.nf` [berserk_bohr] - revision: 4ff07776e0
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
executor > local (8)
[50/e255a0] process > join_process:addTuple (4) [100%] 4 of 4 ✔
[c4/6cc46b] process > join_process:process_join_process (4) [100%] 4 of 4 ✔
[[1, 11/b], [2, 21/a], [3, 31/c], [4, 41/d]]
process process_join_stream {
input:
tuple val(id), val(inMap)
output:
tuple val("$id"), val(out)
exec:
out = inMap.left + inMap.right
}
workflow join_stream {
ch1_ = Channel.from( ["1", "2", "3", "4" ] ) | map{ [ it, it.toString() ] } \
| addTuple
ch2_ = Channel.fromList( [ ["1", "a"] , ["2", "b"], ["3", "c"], ["4", "d"] ] )
ch1_.join(ch2_) \
| map{ id, left, right -> [ id, [ "left" : left, "right" : right ] ] } \
| process_join_stream \
| toSortedList \
| view
}N E X T F L O W ~ version 20.10.0
Launching `./main.nf` [gigantic_bassi] - revision: 4ff07776e0
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
executor > local (8)
[22/57f9ae] process > join_stream:addTuple (1) [100%] 4 of 4 ✔
[9a/956c37] process > join_stream:process_join_stream (4) [100%] 4 of 4 ✔
[[1, 11a], [2, 21b], [3, 31c], [4, 41d]]